/*
 * Copyright (c) 2020, Armink, <armink.ztl@gmail.com>
 *
 * SPDX-License-Identifier: Apache-2.0
 */

/**
 * @file
 * @brief TSDB feature.
 *
 * Time series log (like TSDB) feature implement source file.
 *
 * TSL is time series log, the TSDB saved many TSLs.
 */

#include <inttypes.h>
#include <string.h>
#include "fdb_low_lvl.h"
#include "flashdb.h"

#define FDB_LOG_TAG "[tsl]"
/* rewrite log prefix */
#undef FDB_LOG_PREFIX2
#define FDB_LOG_PREFIX2() \
    FDB_PRINT("[%s][%s] ", db_name(db), _fdb_db_path((fdb_db_t)db))

#if defined(FDB_USING_TSDB)

#if (FDB_WRITE_GRAN == 64)
#error "Flash 64 bits write granularity is not supported in TSDB yet!"
#endif

/* magic word(`T`, `S`, `L`, `0`) */
#define SECTOR_MAGIC_WORD 0x304C5354

#define TSL_STATUS_TABLE_SIZE FDB_STATUS_TABLE_SIZE(FDB_TSL_STATUS_NUM)

#define SECTOR_HDR_DATA_SIZE (FDB_WG_ALIGN(sizeof(struct sector_hdr_data)))
#define LOG_IDX_DATA_SIZE (FDB_WG_ALIGN(sizeof(struct log_idx_data)))
#define LOG_IDX_TS_OFFSET ((unsigned long)(&((struct log_idx_data*)0)->time))
#define SECTOR_MAGIC_OFFSET \
    ((unsigned long)(&((struct sector_hdr_data*)0)->magic))
#define SECTOR_START_TIME_OFFSET \
    ((unsigned long)(&((struct sector_hdr_data*)0)->start_time))
#define SECTOR_END0_TIME_OFFSET \
    ((unsigned long)(&((struct sector_hdr_data*)0)->end_info[0].time))
#define SECTOR_END0_IDX_OFFSET \
    ((unsigned long)(&((struct sector_hdr_data*)0)->end_info[0].index))
#define SECTOR_END0_STATUS_OFFSET \
    ((unsigned long)(&((struct sector_hdr_data*)0)->end_info[0].status))
#define SECTOR_END1_TIME_OFFSET \
    ((unsigned long)(&((struct sector_hdr_data*)0)->end_info[1].time))
#define SECTOR_END1_IDX_OFFSET \
    ((unsigned long)(&((struct sector_hdr_data*)0)->end_info[1].index))
#define SECTOR_END1_STATUS_OFFSET \
    ((unsigned long)(&((struct sector_hdr_data*)0)->end_info[1].status))

/* the next address is get failed */
#define FAILED_ADDR 0xFFFFFFFF

#define db_name(db) (((fdb_db_t)db)->name)
#define db_init_ok(db) (((fdb_db_t)db)->init_ok)
#define db_sec_size(db) (((fdb_db_t)db)->sec_size)
#define db_max_size(db) (((fdb_db_t)db)->max_size)
#define db_oldest_addr(db) (((fdb_db_t)db)->oldest_addr)

#define db_lock(db)                             \
    do {                                        \
        if (((fdb_db_t)db)->lock)               \
            ((fdb_db_t)db)->lock((fdb_db_t)db); \
    } while (0);

#define db_unlock(db)                             \
    do {                                          \
        if (((fdb_db_t)db)->unlock)               \
            ((fdb_db_t)db)->unlock((fdb_db_t)db); \
    } while (0);

#define _FDB_WRITE_STATUS(db, addr, status_table, status_num, status_index, \
                          sync)                                             \
    do {                                                                    \
        result = _fdb_write_status((fdb_db_t)db, addr, status_table,        \
                                   status_num, status_index, sync);         \
        if (result != FDB_NO_ERR)                                           \
            return result;                                                  \
    } while (0);

#define FLASH_WRITE(db, addr, buf, size, sync)                          \
    do {                                                                \
        result = _fdb_flash_write((fdb_db_t)db, addr, buf, size, sync); \
        if (result != FDB_NO_ERR)                                       \
            return result;                                              \
    } while (0);

struct sector_hdr_data {
    uint8_t status[FDB_STORE_STATUS_TABLE_SIZE]; /**< sector store status @see
                                                    fdb_sector_store_status_t */
    uint32_t magic;        /**< magic word(`T`, `S`, `L`, `0`) */
    fdb_time_t start_time; /**< the first start node's timestamp */
    struct {
        fdb_time_t time; /**< the last end node's timestamp */
        uint32_t index;  /**< the last end node's index */
        uint8_t status[TSL_STATUS_TABLE_SIZE]; /**< end node status, @see
                                                  fdb_tsl_status_t */
    } end_info[2];
    uint32_t reserved;
};
typedef struct sector_hdr_data* sector_hdr_data_t;

/* time series log node index data */
struct log_idx_data {
    uint8_t status_table[TSL_STATUS_TABLE_SIZE]; /**< node status, @see
                                                    fdb_tsl_status_t */
    fdb_time_t time;                             /**< node timestamp */
    uint32_t log_len; /**< node total length (header + name + value), must align
                         by FDB_WRITE_GRAN */
    uint32_t log_addr; /**< node address */
};
typedef struct log_idx_data* log_idx_data_t;

struct query_count_args {
    fdb_tsl_status_t status;
    size_t count;
};

struct check_sec_hdr_cb_args {
    fdb_tsdb_t db;
    pika_bool check_failed;
    size_t empty_num;
    uint32_t empty_addr;
};

static fdb_err_t read_tsl(fdb_tsdb_t db, fdb_tsl_t tsl) {
    struct log_idx_data idx;
    /* read TSL index raw data */
    _fdb_flash_read((fdb_db_t)db, tsl->addr.index, (uint32_t*)&idx,
                    sizeof(struct log_idx_data));
    tsl->status =
        (fdb_tsl_status_t)_fdb_get_status(idx.status_table, FDB_TSL_STATUS_NUM);
    if ((tsl->status == FDB_TSL_PRE_WRITE) || (tsl->status == FDB_TSL_UNUSED)) {
        tsl->log_len = db->max_len;
        tsl->addr.log = FDB_DATA_UNUSED;
        tsl->time = 0;
    } else {
        tsl->log_len = idx.log_len;
        tsl->addr.log = idx.log_addr;
        tsl->time = idx.time;
    }

    return FDB_NO_ERR;
}

static uint32_t get_next_sector_addr(fdb_tsdb_t db,
                                     tsdb_sec_info_t pre_sec,
                                     uint32_t traversed_len) {
    if (traversed_len + db_sec_size(db) <= db_max_size(db)) {
        if (pre_sec->addr + db_sec_size(db) < db_max_size(db)) {
            return pre_sec->addr + db_sec_size(db);
        } else {
            /* the next sector is on the top of the database */
            return 0;
        }
    } else {
        /* finished */
        return FAILED_ADDR;
    }
}

static uint32_t get_next_tsl_addr(tsdb_sec_info_t sector, fdb_tsl_t pre_tsl) {
    uint32_t addr = FAILED_ADDR;

    if (sector->status == FDB_SECTOR_STORE_EMPTY) {
        return FAILED_ADDR;
    }

    if (pre_tsl->addr.index + LOG_IDX_DATA_SIZE <= sector->end_idx) {
        addr = pre_tsl->addr.index + LOG_IDX_DATA_SIZE;
    } else {
        /* no TSL */
        return FAILED_ADDR;
    }

    return addr;
}

static uint32_t get_last_tsl_addr(tsdb_sec_info_t sector, fdb_tsl_t pre_tsl) {
    uint32_t addr = FAILED_ADDR;

    if (sector->status == FDB_SECTOR_STORE_EMPTY) {
        return FAILED_ADDR;
    }

    if (pre_tsl->addr.index >=
        (sector->addr + SECTOR_HDR_DATA_SIZE + LOG_IDX_DATA_SIZE)) {
        addr = pre_tsl->addr.index - LOG_IDX_DATA_SIZE;
    } else {
        return FAILED_ADDR;
    }

    return addr;
}

static uint32_t get_last_sector_addr(fdb_tsdb_t db,
                                     tsdb_sec_info_t pre_sec,
                                     uint32_t traversed_len) {
    if (traversed_len + db_sec_size(db) <= db_max_size(db)) {
        if (pre_sec->addr >= db_sec_size(db)) {
            /* the next sector is previous sector */
            return pre_sec->addr - db_sec_size(db);
        } else {
            /* the next sector is the last sector */
            return db_max_size(db) - db_sec_size(db);
        }
    } else {
        return FAILED_ADDR;
    }
}

static fdb_err_t read_sector_info(fdb_tsdb_t db,
                                  uint32_t addr,
                                  tsdb_sec_info_t sector,
                                  pika_bool traversal) {
    fdb_err_t result = FDB_NO_ERR;
    struct sector_hdr_data sec_hdr;

    FDB_ASSERT(sector);

    /* read sector header raw data */
    _fdb_flash_read((fdb_db_t)db, addr, (uint32_t*)&sec_hdr,
                    sizeof(struct sector_hdr_data));

    sector->addr = addr;
    sector->magic = sec_hdr.magic;

    /* check magic word */
    if (sector->magic != SECTOR_MAGIC_WORD) {
        sector->check_ok = pika_false;
        return FDB_INIT_FAILED;
    }
    sector->check_ok = pika_true;
    sector->status = (fdb_sector_store_status_t)_fdb_get_status(
        sec_hdr.status, FDB_SECTOR_STORE_STATUS_NUM);
    sector->start_time = sec_hdr.start_time;
    sector->end_info_stat[0] = (fdb_tsl_status_t)_fdb_get_status(
        sec_hdr.end_info[0].status, FDB_TSL_STATUS_NUM);
    sector->end_info_stat[1] = (fdb_tsl_status_t)_fdb_get_status(
        sec_hdr.end_info[1].status, FDB_TSL_STATUS_NUM);
    if (sector->end_info_stat[0] == FDB_TSL_WRITE) {
        sector->end_time = sec_hdr.end_info[0].time;
        sector->end_idx = sec_hdr.end_info[0].index;
    } else if (sector->end_info_stat[1] == FDB_TSL_WRITE) {
        sector->end_time = sec_hdr.end_info[1].time;
        sector->end_idx = sec_hdr.end_info[1].index;
    } else if (sector->end_info_stat[0] == FDB_TSL_PRE_WRITE &&
               sector->end_info_stat[1] == FDB_TSL_PRE_WRITE) {
        // TODO There is no valid end node info on this sector, need impl fast
        // query this sector by fdb_tsl_iter_by_time
        FDB_ASSERT(0);
    }
    /* traversal all TSL and calculate the remain space size */
    sector->empty_idx = sector->addr + SECTOR_HDR_DATA_SIZE;
    sector->empty_data = sector->addr + db_sec_size(db);
    /* the TSL's data is saved from sector bottom, and the TSL's index saved
     * from the sector top */
    sector->remain = sector->empty_data - sector->empty_idx;
    if (sector->status == FDB_SECTOR_STORE_USING && traversal) {
        struct fdb_tsl tsl;

        tsl.addr.index = sector->empty_idx;
        while (read_tsl(db, &tsl) == FDB_NO_ERR) {
            if (tsl.status == FDB_TSL_UNUSED) {
                break;
            }
            sector->end_time = tsl.time;
            sector->end_idx = tsl.addr.index;
            sector->empty_idx += LOG_IDX_DATA_SIZE;
            sector->empty_data -= FDB_WG_ALIGN(tsl.log_len);
            tsl.addr.index += LOG_IDX_DATA_SIZE;
            if (sector->remain >
                LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(tsl.log_len)) {
                sector->remain -=
                    (LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(tsl.log_len));
            } else {
                FDB_INFO("Error: this TSL (0x%08" PRIX32 ") size (%" PRIu32
                         ") is out of bound.\n",
                         tsl.addr.index, tsl.log_len);
                sector->remain = 0;
                result = FDB_READ_ERR;
                break;
            }
        }
    }

    return result;
}

static fdb_err_t format_sector(fdb_tsdb_t db, uint32_t addr) {
    fdb_err_t result = FDB_NO_ERR;
    struct sector_hdr_data sec_hdr;

    FDB_ASSERT(addr % db_sec_size(db) == 0);

    result = _fdb_flash_erase((fdb_db_t)db, addr, db_sec_size(db));
    if (result == FDB_NO_ERR) {
        _FDB_WRITE_STATUS(db, addr, sec_hdr.status, FDB_SECTOR_STORE_STATUS_NUM,
                          FDB_SECTOR_STORE_EMPTY, pika_true);
        /* set the magic */
        sec_hdr.magic = SECTOR_MAGIC_WORD;
        FLASH_WRITE(db, addr + SECTOR_MAGIC_OFFSET, &sec_hdr.magic,
                    sizeof(sec_hdr.magic), pika_true);
    }

    return result;
}

static void sector_iterator(fdb_tsdb_t db,
                            tsdb_sec_info_t sector,
                            fdb_sector_store_status_t status,
                            void* arg1,
                            void* arg2,
                            pika_bool (*callback)(tsdb_sec_info_t sector,
                                                  void* arg1,
                                                  void* arg2),
                            pika_bool traversal) {
    uint32_t sec_addr = sector->addr, traversed_len = 0;

    /* search all sectors */
    do {
        read_sector_info(db, sec_addr, sector, pika_false);
        if (status == FDB_SECTOR_STORE_UNUSED || status == sector->status) {
            if (traversal) {
                read_sector_info(db, sec_addr, sector, pika_true);
            }
            /* iterator is interrupted when callback return pika_true */
            if (callback && callback(sector, arg1, arg2)) {
                return;
            }
        }
        traversed_len += db_sec_size(db);
    } while ((sec_addr = get_next_sector_addr(db, sector, traversed_len)) !=
             FAILED_ADDR);
}

static fdb_err_t write_tsl(fdb_tsdb_t db, fdb_blob_t blob, fdb_time_t time) {
    fdb_err_t result = FDB_NO_ERR;
    struct log_idx_data idx;
    uint32_t idx_addr = db->cur_sec.empty_idx;

    idx.log_len = blob->size;
    idx.time = time;
    idx.log_addr = db->cur_sec.empty_data - FDB_WG_ALIGN(idx.log_len);
    /* write the status will by write granularity */
    _FDB_WRITE_STATUS(db, idx_addr, idx.status_table, FDB_TSL_STATUS_NUM,
                      FDB_TSL_PRE_WRITE, pika_false);
    /* write other index info */
    FLASH_WRITE(db, idx_addr + LOG_IDX_TS_OFFSET, &idx.time,
                sizeof(struct log_idx_data) - LOG_IDX_TS_OFFSET, pika_false);
    /* write blob data */
    FLASH_WRITE(db, idx.log_addr, blob->buf, blob->size, pika_false);
    /* write the status will by write granularity */
    _FDB_WRITE_STATUS(db, idx_addr, idx.status_table, FDB_TSL_STATUS_NUM,
                      FDB_TSL_WRITE, pika_true);

    return result;
}

static fdb_err_t update_sec_status(fdb_tsdb_t db,
                                   tsdb_sec_info_t sector,
                                   fdb_blob_t blob,
                                   fdb_time_t cur_time) {
    fdb_err_t result = FDB_NO_ERR;
    uint8_t status[FDB_STORE_STATUS_TABLE_SIZE];

    if (sector->status == FDB_SECTOR_STORE_USING &&
        sector->remain < LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(blob->size)) {
        uint8_t end_status[TSL_STATUS_TABLE_SIZE];
        uint32_t end_index = sector->empty_idx - LOG_IDX_DATA_SIZE,
                 new_sec_addr, cur_sec_addr = sector->addr;
        /* save the end node index and timestamp */
        if (sector->end_info_stat[0] == FDB_TSL_UNUSED) {
            _FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END0_STATUS_OFFSET,
                              end_status, FDB_TSL_STATUS_NUM, FDB_TSL_PRE_WRITE,
                              pika_false);
            FLASH_WRITE(db, cur_sec_addr + SECTOR_END0_TIME_OFFSET,
                        (uint32_t*)&db->last_time, sizeof(fdb_time_t),
                        pika_false);
            FLASH_WRITE(db, cur_sec_addr + SECTOR_END0_IDX_OFFSET, &end_index,
                        sizeof(end_index), pika_false);
            _FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END0_STATUS_OFFSET,
                              end_status, FDB_TSL_STATUS_NUM, FDB_TSL_WRITE,
                              pika_true);
        } else if (sector->end_info_stat[1] == FDB_TSL_UNUSED) {
            _FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END1_STATUS_OFFSET,
                              end_status, FDB_TSL_STATUS_NUM, FDB_TSL_PRE_WRITE,
                              pika_false);
            FLASH_WRITE(db, cur_sec_addr + SECTOR_END1_TIME_OFFSET,
                        (uint32_t*)&db->last_time, sizeof(fdb_time_t),
                        pika_false);
            FLASH_WRITE(db, cur_sec_addr + SECTOR_END1_IDX_OFFSET, &end_index,
                        sizeof(end_index), pika_false);
            _FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END1_STATUS_OFFSET,
                              end_status, FDB_TSL_STATUS_NUM, FDB_TSL_WRITE,
                              pika_true);
        }
        /* change current sector to full */
        _FDB_WRITE_STATUS(db, cur_sec_addr, status, FDB_SECTOR_STORE_STATUS_NUM,
                          FDB_SECTOR_STORE_FULL, pika_true);
        sector->status = FDB_SECTOR_STORE_FULL;
        /* calculate next sector address */
        if (sector->addr + db_sec_size(db) < db_max_size(db)) {
            new_sec_addr = sector->addr + db_sec_size(db);
        } else if (db->rollover) {
            new_sec_addr = 0;
        } else {
            /* not rollover */
            return FDB_SAVED_FULL;
        }
        read_sector_info(db, new_sec_addr, &db->cur_sec, pika_false);
        if (sector->status != FDB_SECTOR_STORE_EMPTY) {
            /* calculate the oldest sector address */
            if (new_sec_addr + db_sec_size(db) < db_max_size(db)) {
                db_oldest_addr(db) = new_sec_addr + db_sec_size(db);
            } else {
                db_oldest_addr(db) = 0;
            }
            format_sector(db, new_sec_addr);
            read_sector_info(db, new_sec_addr, &db->cur_sec, pika_false);
        }
    } else if (sector->status == FDB_SECTOR_STORE_FULL) {
        /* database full */
        return FDB_SAVED_FULL;
    }

    if (sector->status == FDB_SECTOR_STORE_EMPTY) {
        /* change the sector to using */
        sector->status = FDB_SECTOR_STORE_USING;
        sector->start_time = cur_time;
        _FDB_WRITE_STATUS(db, sector->addr, status, FDB_SECTOR_STORE_STATUS_NUM,
                          FDB_SECTOR_STORE_USING, pika_true);
        /* save the start timestamp */
        FLASH_WRITE(db, sector->addr + SECTOR_START_TIME_OFFSET,
                    (uint32_t*)&cur_time, sizeof(fdb_time_t), pika_true);
    }

    return result;
}

static fdb_err_t tsl_append(fdb_tsdb_t db, fdb_blob_t blob) {
    fdb_err_t result = FDB_NO_ERR;
    fdb_time_t cur_time = db->get_time();

    FDB_ASSERT(blob->size <= db->max_len);

    /* check the current timestamp, MUST more than the last save timestamp */
    if (cur_time <= db->last_time) {
        FDB_INFO("Warning: current timestamp (%" PRIdMAX
                 ") is less than or equal to the last save timestamp (%" PRIdMAX
                 "). This tsl will be dropped.\n",
                 (intmax_t)cur_time, (intmax_t)(db->last_time));
        return FDB_WRITE_ERR;
    }

    result = update_sec_status(db, &db->cur_sec, blob, cur_time);
    if (result != FDB_NO_ERR) {
        return result;
    }

    /* write the TSL node */
    result = write_tsl(db, blob, cur_time);
    if (result != FDB_NO_ERR) {
        return result;
    }

    /* recalculate the current using sector info */
    db->cur_sec.end_idx = db->cur_sec.empty_idx;
    db->cur_sec.end_time = cur_time;
    db->cur_sec.empty_idx += LOG_IDX_DATA_SIZE;
    db->cur_sec.empty_data -= FDB_WG_ALIGN(blob->size);
    db->cur_sec.remain -= LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(blob->size);
    db->last_time = cur_time;

    return result;
}

/**
 * Append a new log to TSDB.
 *
 * @param db database object
 * @param blob log blob data
 *
 * @return result
 */
fdb_err_t fdb_tsl_append(fdb_tsdb_t db, fdb_blob_t blob) {
    fdb_err_t result = FDB_NO_ERR;

    if (!db_init_ok(db)) {
        FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
        return FDB_INIT_FAILED;
    }

    db_lock(db);
    result = tsl_append(db, blob);
    db_unlock(db);

    return result;
}

/**
 * The TSDB iterator for each TSL.
 *
 * @param db database object
 * @param cb callback
 * @param arg callback argument
 */
void fdb_tsl_iter(fdb_tsdb_t db, fdb_tsl_cb cb, void* arg) {
    struct tsdb_sec_info sector;
    uint32_t sec_addr, traversed_len = 0;
    struct fdb_tsl tsl;

    if (!db_init_ok(db)) {
        FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
    }

    if (cb == NULL) {
        return;
    }

    sec_addr = db_oldest_addr(db);
    db_lock(db);
    /* search all sectors */
    do {
        traversed_len += db_sec_size(db);
        if (read_sector_info(db, sec_addr, &sector, pika_false) != FDB_NO_ERR) {
            continue;
        }
        /* sector has TSL */
        if (sector.status == FDB_SECTOR_STORE_USING ||
            sector.status == FDB_SECTOR_STORE_FULL) {
            if (sector.status == FDB_SECTOR_STORE_USING) {
                /* copy the current using sector status  */
                sector = db->cur_sec;
            }
            tsl.addr.index = sector.addr + SECTOR_HDR_DATA_SIZE;
            /* search all TSL */
            do {
                read_tsl(db, &tsl);
                /* iterator is interrupted when callback return pika_true */
                if (cb(&tsl, arg)) {
                    db_unlock(db);
                    return;
                }
            } while ((tsl.addr.index = get_next_tsl_addr(&sector, &tsl)) !=
                     FAILED_ADDR);
        }
    } while ((sec_addr = get_next_sector_addr(db, &sector, traversed_len)) !=
             FAILED_ADDR);
    db_unlock(db);
}

/**
 * The TSDB iterator for each TSL.
 *
 * @param db database object
 * @param cb callback
 * @param arg callback argument
 */
void fdb_tsl_iter_reverse(fdb_tsdb_t db, fdb_tsl_cb cb, void* cb_arg) {
    struct tsdb_sec_info sector;
    uint32_t sec_addr, traversed_len = 0;
    struct fdb_tsl tsl;

    if (!db_init_ok(db)) {
        FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
    }

    if (cb == NULL) {
        return;
    }

    sec_addr = db->cur_sec.addr;
    db_lock(db);
    /* search all sectors */
    do {
        traversed_len += db_sec_size(db);
        if (read_sector_info(db, sec_addr, &sector, pika_false) != FDB_NO_ERR) {
            continue;
        }
        /* sector has TSL */
        if (sector.status == FDB_SECTOR_STORE_USING ||
            sector.status == FDB_SECTOR_STORE_FULL) {
            if (sector.status == FDB_SECTOR_STORE_USING) {
                /* copy the current using sector status  */
                sector = db->cur_sec;
            }
            tsl.addr.index = sector.end_idx;
            /* search all TSL */
            do {
                read_tsl(db, &tsl);
                /* iterator is interrupted when callback return pika_true */
                if (cb(&tsl, cb_arg)) {
                    goto __exit;
                }
            } while ((tsl.addr.index = get_last_tsl_addr(&sector, &tsl)) !=
                     FAILED_ADDR);
        } else if (sector.status == FDB_SECTOR_STORE_EMPTY ||
                   sector.status == FDB_SECTOR_STORE_UNUSED)
            goto __exit;
    } while ((sec_addr = get_last_sector_addr(db, &sector, traversed_len)) !=
             FAILED_ADDR);

__exit:
    db_unlock(db);
}

/*
 * Found the matched TSL address.
 */
static int search_start_tsl_addr(fdb_tsdb_t db,
                                 int start,
                                 int end,
                                 fdb_time_t from,
                                 fdb_time_t to) {
    struct fdb_tsl tsl;
    while (pika_true) {
        tsl.addr.index =
            start + FDB_ALIGN((end - start) / 2, LOG_IDX_DATA_SIZE);
        read_tsl(db, &tsl);
        if (tsl.time < from) {
            start = tsl.addr.index + LOG_IDX_DATA_SIZE;
        } else if (tsl.time > from) {
            end = tsl.addr.index - LOG_IDX_DATA_SIZE;
        } else {
            return tsl.addr.index;
        }

        if (start > end) {
            if (from > to) {
                tsl.addr.index = start;
                read_tsl(db, &tsl);
                if (tsl.time > from) {
                    start -= LOG_IDX_DATA_SIZE;
                }
            }
            break;
        }
    }
    return start;
}

/**
 * The TSDB iterator for each TSL by timestamp.
 *
 * @param db database object
 * @param from starting timestamp. It will be a reverse iterator when ending
 * timestamp less than starting timestamp
 * @param to ending timestamp
 * @param cb callback
 * @param arg callback argument
 */
void fdb_tsl_iter_by_time(fdb_tsdb_t db,
                          fdb_time_t from,
                          fdb_time_t to,
                          fdb_tsl_cb cb,
                          void* cb_arg) {
    struct tsdb_sec_info sector;
    uint32_t sec_addr, start_addr, traversed_len = 0;
    struct fdb_tsl tsl;
    pika_bool found_start_tsl = pika_false;

    uint32_t (*get_sector_addr)(fdb_tsdb_t, tsdb_sec_info_t, uint32_t);
    uint32_t (*get_tsl_addr)(tsdb_sec_info_t, fdb_tsl_t);

    if (!db_init_ok(db)) {
        FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
    }

    if (from <= to) {
        start_addr = db_oldest_addr(db);
        get_sector_addr = get_next_sector_addr;
        get_tsl_addr = get_next_tsl_addr;
    } else {
        start_addr = db->cur_sec.addr;
        get_sector_addr = get_last_sector_addr;
        get_tsl_addr = get_last_tsl_addr;
    }

    //    FDB_INFO("from %s", ctime((const time_t * )&from));
    //    FDB_INFO("to %s", ctime((const time_t * )&to));

    if (cb == NULL) {
        return;
    }

    sec_addr = start_addr;
    db_lock(db);
    /* search all sectors */
    do {
        traversed_len += db_sec_size(db);
        if (read_sector_info(db, sec_addr, &sector, pika_false) != FDB_NO_ERR) {
            continue;
        }
        /* sector has TSL */
        if ((sector.status == FDB_SECTOR_STORE_USING ||
             sector.status == FDB_SECTOR_STORE_FULL)) {
            if (sector.status == FDB_SECTOR_STORE_USING) {
                /* copy the current using sector status  */
                sector = db->cur_sec;
            }
            if ((found_start_tsl) ||
                (!found_start_tsl &&
                 ((from <= to &&
                   ((sec_addr == start_addr && from <= sector.start_time) ||
                    from <= sector.end_time)) ||
                  (from > to &&
                   ((sec_addr == start_addr && from >= sector.end_time) ||
                    from >= sector.start_time))))) {
                uint32_t start = sector.addr + SECTOR_HDR_DATA_SIZE,
                         end = sector.end_idx;

                found_start_tsl = pika_true;
                /* search the first start TSL address */
                tsl.addr.index =
                    search_start_tsl_addr(db, start, end, from, to);
                /* search all TSL */
                do {
                    read_tsl(db, &tsl);
                    if (tsl.status != FDB_TSL_UNUSED) {
                        if ((from <= to && tsl.time >= from &&
                             tsl.time <= to) ||
                            (from > to && tsl.time <= from && tsl.time >= to)) {
                            /* iterator is interrupted when callback return
                             * pika_true
                             */
                            if (cb(&tsl, cb_arg)) {
                                goto __exit;
                            }
                        } else {
                            goto __exit;
                        }
                    }
                } while ((tsl.addr.index = get_tsl_addr(&sector, &tsl)) !=
                         FAILED_ADDR);
            }
        } else if (sector.status == FDB_SECTOR_STORE_EMPTY) {
            goto __exit;
        }
    } while ((sec_addr = get_sector_addr(db, &sector, traversed_len)) !=
             FAILED_ADDR);

__exit:
    db_unlock(db);
}

static pika_bool query_count_cb(fdb_tsl_t tsl, void* arg) {
    struct query_count_args* args = arg;

    if (tsl->status == args->status) {
        args->count++;
    }

    return pika_false;
}

/**
 * Query some TSL's count by timestamp and status.
 *
 * @param db database object
 * @param from starting timestamp
 * @param to ending timestamp
 * @param status status
 */
size_t fdb_tsl_query_count(fdb_tsdb_t db,
                           fdb_time_t from,
                           fdb_time_t to,
                           fdb_tsl_status_t status) {
    struct query_count_args arg = {FDB_TSL_UNUSED, 0};

    arg.status = status;

    if (!db_init_ok(db)) {
        FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
        return FDB_INIT_FAILED;
    }

    fdb_tsl_iter_by_time(db, from, to, query_count_cb, &arg);

    return arg.count;
}

/**
 * Set the TSL status.
 *
 * @param db database object
 * @param tsl TSL object
 * @param status status
 *
 * @return result
 */
fdb_err_t fdb_tsl_set_status(fdb_tsdb_t db,
                             fdb_tsl_t tsl,
                             fdb_tsl_status_t status) {
    fdb_err_t result = FDB_NO_ERR;
    uint8_t status_table[TSL_STATUS_TABLE_SIZE];

    /* write the status will by write granularity */
    _FDB_WRITE_STATUS(db, tsl->addr.index, status_table, FDB_TSL_STATUS_NUM,
                      status, pika_true);

    return result;
}

/**
 * Convert the TSL object to blob object
 *
 * @param tsl TSL object
 * @param blob blob object
 *
 * @return new blob object
 */
fdb_blob_t fdb_tsl_to_blob(fdb_tsl_t tsl, fdb_blob_t blob) {
    blob->saved.addr = tsl->addr.log;
    blob->saved.meta_addr = tsl->addr.index;
    blob->saved.len = tsl->log_len;

    return blob;
}

static pika_bool check_sec_hdr_cb(tsdb_sec_info_t sector,
                                  void* arg1,
                                  void* arg2) {
    struct check_sec_hdr_cb_args* arg = arg1;
    fdb_tsdb_t db = arg->db;

    if (!sector->check_ok) {
        FDB_INFO("Sector (0x%08" PRIX32 ") header info is incorrect.\n",
                 sector->addr);
        (arg->check_failed) = pika_true;
        return pika_true;
    } else if (sector->status == FDB_SECTOR_STORE_USING) {
        if (db->cur_sec.addr == FDB_DATA_UNUSED) {
            memcpy(&db->cur_sec, sector, sizeof(struct tsdb_sec_info));
        } else {
            FDB_INFO(
                "Warning: Sector status is wrong, there are multiple sectors "
                "in use.\n");
            (arg->check_failed) = pika_true;
            return pika_true;
        }
    } else if (sector->status == FDB_SECTOR_STORE_EMPTY) {
        (arg->empty_num) += 1;
        arg->empty_addr = sector->addr;
        if ((arg->empty_num) == 1 && db->cur_sec.addr == FDB_DATA_UNUSED) {
            memcpy(&db->cur_sec, sector, sizeof(struct tsdb_sec_info));
        }
    }

    return pika_false;
}
static pika_bool format_all_cb(tsdb_sec_info_t sector, void* arg1, void* arg2) {
    fdb_tsdb_t db = arg1;

    format_sector(db, sector->addr);

    return pika_false;
}

static void tsl_format_all(fdb_tsdb_t db) {
    struct tsdb_sec_info sector;

    sector.addr = 0;
    sector_iterator(db, &sector, FDB_SECTOR_STORE_UNUSED, db, NULL,
                    format_all_cb, pika_false);
    db_oldest_addr(db) = 0;
    db->cur_sec.addr = 0;
    db->last_time = 0;
    /* read the current using sector info */
    read_sector_info(db, db->cur_sec.addr, &db->cur_sec, pika_false);

    FDB_INFO("All sector format finished.\n");
}

/**
 * Clean all the data in the TSDB.
 *
 * @note It's DANGEROUS. This operation is not reversible.
 *
 * @param db database object
 */
void fdb_tsl_clean(fdb_tsdb_t db) {
    db_lock(db);
    tsl_format_all(db);
    db_unlock(db);
}

/**
 * This function will get or set some options of the database
 *
 * @param db database object
 * @param cmd the control command
 * @param arg the argument
 */
void fdb_tsdb_control(fdb_tsdb_t db, int cmd, void* arg) {
    FDB_ASSERT(db);

    switch (cmd) {
        case FDB_TSDB_CTRL_SET_SEC_SIZE:
            /* this change MUST before database initialization */
            FDB_ASSERT(db->parent.init_ok == pika_false);
            db->parent.sec_size = *(uint32_t*)arg;
            break;
        case FDB_TSDB_CTRL_GET_SEC_SIZE:
            *(uint32_t*)arg = db->parent.sec_size;
            break;
        case FDB_TSDB_CTRL_SET_LOCK:
#if !defined(__ARMCC_VERSION) && defined(__GNUC__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"
#endif
            db->parent.lock = (void (*)(fdb_db_t db))arg;
#if !defined(__ARMCC_VERSION) && defined(__GNUC__)
#pragma GCC diagnostic pop
#endif
            break;
        case FDB_TSDB_CTRL_SET_UNLOCK:
#if !defined(__ARMCC_VERSION) && defined(__GNUC__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"
#endif
            db->parent.unlock = (void (*)(fdb_db_t db))arg;
#if !defined(__ARMCC_VERSION) && defined(__GNUC__)
#pragma GCC diagnostic pop
#endif
            break;
        case FDB_TSDB_CTRL_SET_ROLLOVER:
            /* this change MUST after database initialized */
            FDB_ASSERT(db->parent.init_ok == pika_true);
            db->rollover = *(pika_bool*)arg;
            break;
        case FDB_TSDB_CTRL_GET_ROLLOVER:
            *(pika_bool*)arg = db->rollover;
            break;
        case FDB_TSDB_CTRL_GET_LAST_TIME:
            *(fdb_time_t*)arg = db->last_time;
            break;
        case FDB_TSDB_CTRL_SET_FILE_MODE:
#ifdef FDB_USING_FILE_MODE
            /* this change MUST before database initialization */
            FDB_ASSERT(db->parent.init_ok == pika_false);
            db->parent.file_mode = *(pika_bool*)arg;
#else
            FDB_INFO(
                "Error: set file mode Failed. Please defined the "
                "FDB_USING_FILE_MODE macro.");
#endif
            break;
        case FDB_TSDB_CTRL_SET_MAX_SIZE:
#ifdef FDB_USING_FILE_MODE
            /* this change MUST before database initialization */
            FDB_ASSERT(db->parent.init_ok == pika_false);
            db->parent.max_size = *(uint32_t*)arg;
#endif
            break;
        case FDB_TSDB_CTRL_SET_NOT_FORMAT:
            /* this change MUST before database initialization */
            FDB_ASSERT(db->parent.init_ok == pika_false);
            db->parent.not_formatable = *(pika_bool*)arg;
            break;
    }
}

/**
 * The time series database initialization.
 *
 * @param db database object
 * @param name database name
 * @param path FAL mode: partition name, file mode: database saved directory
 * path
 * @param get_time get current time function
 * @param max_len maximum length of each log
 * @param user_data user data
 *
 * @return result
 */
fdb_err_t fdb_tsdb_init(fdb_tsdb_t db,
                        const char* name,
                        const char* path,
                        fdb_get_time get_time,
                        size_t max_len,
                        void* user_data) {
    fdb_err_t result = FDB_NO_ERR;
    struct tsdb_sec_info sector;
    struct check_sec_hdr_cb_args check_sec_arg = {db, pika_false, 0, 0};

    FDB_ASSERT(get_time);

    result = _fdb_init_ex((fdb_db_t)db, name, path, FDB_DB_TYPE_TS, user_data);
    if (result != FDB_NO_ERR) {
        goto __exit;
    }

    db->get_time = get_time;
    db->max_len = max_len;
    /* default rollover flag is pika_true */
    db->rollover = pika_true;
    db_oldest_addr(db) = FDB_DATA_UNUSED;
    db->cur_sec.addr = FDB_DATA_UNUSED;
    /* must less than sector size */
    FDB_ASSERT(max_len < db_sec_size(db));

    /* check all sector header */
    sector.addr = 0;
    sector_iterator(db, &sector, FDB_SECTOR_STORE_UNUSED, &check_sec_arg, NULL,
                    check_sec_hdr_cb, pika_true);
    /* format all sector when check failed */
    if (check_sec_arg.check_failed) {
        if (db->parent.not_formatable) {
            result = FDB_READ_ERR;
            goto __exit;
        } else {
            tsl_format_all(db);
        }
    } else {
        uint32_t latest_addr;
        if (check_sec_arg.empty_num > 0) {
            latest_addr = check_sec_arg.empty_addr;
        } else {
            if (db->rollover) {
                latest_addr = db->cur_sec.addr;
            } else {
                /* There is no empty sector. */
                latest_addr = db->cur_sec.addr =
                    db_max_size(db) - db_sec_size(db);
            }
        }
        /* db->cur_sec is the latest sector, and the next is the oldest sector
         */
        if (latest_addr + db_sec_size(db) >= db_max_size(db)) {
            /* db->cur_sec is the the bottom of the database */
            db_oldest_addr(db) = 0;
        } else {
            db_oldest_addr(db) = latest_addr + db_sec_size(db);
        }
    }
    FDB_DEBUG("TSDB (%s) oldest sectors is 0x%08" PRIX32
              ", current using sector is 0x%08" PRIX32 ".\n",
              db_name(db), db_oldest_addr(db), db->cur_sec.addr);
    /* read the current using sector info */
    read_sector_info(db, db->cur_sec.addr, &db->cur_sec, pika_true);
    /* get last save time */
    if (db->cur_sec.status == FDB_SECTOR_STORE_USING) {
        db->last_time = db->cur_sec.end_time;
    } else if (db->cur_sec.status == FDB_SECTOR_STORE_EMPTY &&
               db_oldest_addr(db) != db->cur_sec.addr) {
        struct tsdb_sec_info sec;
        uint32_t addr = db->cur_sec.addr;

        if (addr == 0) {
            addr = db_max_size(db) - db_sec_size(db);
        } else {
            addr -= db_sec_size(db);
        }
        read_sector_info(db, addr, &sec, pika_false);
        db->last_time = sec.end_time;
    }

__exit:

    _fdb_init_finish((fdb_db_t)db, result);

    return result;
}

/**
 * The time series database deinitialization.
 *
 * @param db database object
 *
 * @return result
 */
fdb_err_t fdb_tsdb_deinit(fdb_tsdb_t db) {
    _fdb_deinit((fdb_db_t)db);

    return FDB_NO_ERR;
}

#endif /* defined(FDB_USING_TSDB) */
